package com.demo.join;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *
 */
public class TriggerCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9090);
        dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] tokens = value.split("\\W+");
                for (String token : tokens) {
                    if (token != null && token.length() > 0) {
                        out.collect(new Tuple2<String, Integer>(token, 1));
                    }
                }
            }
        }).keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                //.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(3)))//窗口每统计一次当前计算结果
                .sum(1)// count求和
                .map(new MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>>() {
                    @Override
                    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value)
                            throws Exception {
                        // TODO Auto-generated method stub
                        DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String s = format2.format(new Date());
                        return new Tuple3<String, String, Integer>(value.f0, s, value.f1);
                    }
                }).print("trigger");
        env.execute("Java WordCount from SocketTextStream Example");
    }
}
